feat: support extension planner for TableScan#20548
feat: support extension planner for TableScan#20548linhr wants to merge 5 commits intoapache:mainfrom
TableScan#20548Conversation
|
Thanks @linhr for working on this! The overall approach looks clean and well-scoped. One design question: would it be better to try the standard LogicalPlan::TableScan(scan) => {
if let Some(default_source) = scan.source
.as_any()
.downcast_ref::<DefaultTableSource>()
{
// existing TableProvider scan logic
} else {
// try extension planners for custom TableSource
for planner in &self.extension_planners {
if let Some(plan) = planner
.plan_table_scan(self, scan, session_state)
.await?
{
return Ok(plan);
}
}
plan_err!("No installed planner was able to plan TableScan for custom TableSource: {:?}", scan.table_name)
}
}Rationale: Also, a minor note: the existing plan_extension path validates that the returned ExecutionPlan schema matches the LogicalPlan schema (around line 1649). It might be worth adding a similar check here to catch mismatches early. What do you think? |
|
@goldmedal Thanks for the review!
Good points! Yeah I agree that having extension planner as a fallback is indeed better, for the reasons you've mentioned. I have made the change accordingly.
Nice catch! I have added the validation (with a minor refactor to extract the validation logic as a separate function). |
| /// Create a physical plan for a [`LogicalPlan::TableScan`]. | ||
| /// | ||
| /// This is useful for planning valid [`TableSource`]s that are not [`TableProvider`]s. | ||
| /// |
There was a problem hiding this comment.
It would be great to add a simple example for this method. Something like:
| /// | |
| /// # Example | |
| /// | |
| /// ```rust,ignore | |
| /// use std::sync::Arc; | |
| /// use datafusion::physical_plan::ExecutionPlan; | |
| /// use datafusion::logical_expr::TableScan; | |
| /// use datafusion::execution::context::SessionState; | |
| /// use datafusion::error::Result; | |
| /// use datafusion_physical_planner::{ExtensionPlanner, PhysicalPlanner}; | |
| /// use async_trait::async_trait; | |
| /// | |
| /// // Your custom table source type | |
| /// struct MyCustomTableSource { /* ... */ } | |
| /// | |
| /// // Your custom execution plan | |
| /// struct MyCustomExec { /* ... */ } | |
| /// | |
| /// struct MyExtensionPlanner; | |
| /// | |
| /// #[async_trait] | |
| /// impl ExtensionPlanner for MyExtensionPlanner { | |
| /// async fn plan_extension( | |
| /// &self, | |
| /// _planner: &dyn PhysicalPlanner, | |
| /// _node: &dyn UserDefinedLogicalNode, | |
| /// _logical_inputs: &[&LogicalPlan], | |
| /// _physical_inputs: &[Arc<dyn ExecutionPlan>], | |
| /// _session_state: &SessionState, | |
| /// ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | |
| /// Ok(None) | |
| /// } | |
| /// | |
| /// async fn plan_table_scan( | |
| /// &self, | |
| /// _planner: &dyn PhysicalPlanner, | |
| /// scan: &TableScan, | |
| /// _session_state: &SessionState, | |
| /// ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | |
| /// // Check if this is your custom table source | |
| /// if scan.source.as_any().is::<MyCustomTableSource>() { | |
| /// // Create a custom execution plan for your table source | |
| /// let exec = MyCustomExec::new( | |
| /// scan.table_name.clone(), | |
| /// Arc::clone(scan.projected_schema.inner()), | |
| /// ); | |
| /// Ok(Some(Arc::new(exec))) | |
| /// } else { | |
| /// // Return None to let other extension planners handle it | |
| /// Ok(None) | |
| /// } | |
| /// } | |
| /// } | |
| /// ``` |
Which issue does this PR close?
TableScan#20547.Rationale for this change
Please refer to the issue for context. This PR serves as a proof-of-concept and we can consider merging it if we reach consensus on the design discussed in the issue.
What changes are included in this PR?
The trait method
ExtensionPlanner::plan_table_scan()is added so that the user can define physical planning logic for custom table sources.Are these changes tested?
The changes are accompanied with unit tests.
Are there any user-facing changes?
Yes, a new trait method is added to
ExtensionPlanner. This is not a breaking change since the trait method has a default implementation.